Merge branch nadav/feature-split-merges into main#6461
Conversation
Make MergeSchedulerService optional
* Spawn merge pipeline from new compactor * new line because adrien is annoyed about it * comment
* Wire up compactor service * lints
* Implement compactor pipeline update logic * lints and fixes * comments, lints, test fixes, other things
Refactor migrations
…xplicitly enabled (#6453) * Revert to using existing merge flow when standalone compactors isnt explicitly enabled * Pull out some configs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e60210f3d1
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| self.indexer_config.enable_standalone_compactors = | ||
| self.enable_standalone_compactors.resolve(env_vars)?; |
There was a problem hiding this comment.
Preserve indexer standalone-compactor flag from config
build_and_validate unconditionally overwrites indexer.enable_standalone_compactors from a separate top-level ConfigValue, whose default is false; this means setting indexer.enable_standalone_compactors: true in the indexer section is silently ignored unless QW_ENABLE_STANDALONE_COMPACTORS (or the top-level field) is also set. In practice, operators enabling standalone compaction through the documented indexer config will still run in legacy mode and may never start external compactor flows.
Useful? React with 👍 / 👎.
| .wait_for(COMPACTION_SERVICE_DISCOVERY_TIMEOUT, |connections| { | ||
| !connections.is_empty() | ||
| }) |
There was a problem hiding this comment.
Validate planner endpoint, not just janitor presence
On compactor-only nodes, discovery succeeds as soon as any janitor connection exists, but this does not guarantee that the janitor exposes the compaction planner RPC (for example, janitor nodes running with standalone compactors disabled). In that case startup proceeds with a client to a node that cannot serve report_status, causing repeated runtime failures instead of a clear startup error.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ce469888b6
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let indexing_split_cache: Arc<IndexingSplitCache> = if node_config | ||
| .is_service_enabled(QuickwitService::Indexer) | ||
| && node_config.is_service_enabled(QuickwitService::Compactor) |
There was a problem hiding this comment.
Enable split cache when indexer runs without compactor
This condition now falls back to IndexingSplitCache::no_caching() unless both indexer and compactor services are enabled, which disables the indexer split cache in common indexer-only deployments. Before this change, IndexingService::new always opened the cache from indexer_config; with the new wiring, legacy merge pipelines on indexer-only nodes repeatedly re-download splits instead of reusing local cache, causing a significant performance and bandwidth regression.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7054e1d339
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if !has_healthy { | ||
| self.record_pipeline_duration(); | ||
| info!(task_id=%self.task_id, "all compaction pipeline actors completed"); | ||
| self.status = PipelineStatus::Completed; | ||
| self.record_terminal_metrics(true); |
There was a problem hiding this comment.
Mark one-shot compaction pipelines without actor termination
This completion test only succeeds when every child actor reports Health::Success, but the downloader/executor/packager/uploader/publisher actors spawned for a compaction task are the normal long-lived actors and do not exit after processing a single merge message. After a successful publish they therefore stay Healthy until the progress heartbeat check eventually marks idle actors unhealthy, so the task is never reported as Completed (and may be reported failed), leaking compactor slots and preventing the planner from receiving successful acknowledgements.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
https://github.com/quickwit-oss/quickwit/blob/a5b1b012c6c24b0d055c40c7f6bee2f2ab2c8d7f/quickwit-compaction/src/planner/compaction_state.rs#L143-L145
Drop splits once they age out of compaction
When a tracked split reaches its maturity timestamp before it can form a merge operation (for example, a low-volume partition that never reaches the merge factor), merge_policy.operations(splits) leaves that mature split in splits, so this branch keeps the partition and its ID in needs_compaction_split_ids forever. Those stale IDs continue to be counted/excluded on every scan and can eventually consume the MAX_EXCLUDED_SPLIT_IDS budget, causing later immature splits to be hidden behind already-tracked rows instead of being discovered.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
954d2d8 to
82fa2be
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 82fa2bef62
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| for split_id in &task.split_ids { | ||
| self.in_flight_split_ids.insert(split_id.clone()); | ||
| self.needs_compaction_split_ids.remove(split_id.as_str()); |
There was a problem hiding this comment.
Remove adopted heartbeat splits from pending vectors
When the planner has already scanned a split into needs_compaction and later receives an in-progress heartbeat for an unknown task (the recovery case described here), this only removes the split ID from needs_compaction_split_ids; the SplitMetadata remains in the needs_compaction vector. plan_partition operates on that vector, so the next planning tick can enqueue a second merge for the same split while the worker is still reporting it in progress. Remove the matching metadata from needs_compaction as well, or make planning skip IDs that are already in flight.
Useful? React with 👍 / 👎.
| if query | ||
| .excluded_split_ids | ||
| .iter() | ||
| .any(|excluded| excluded == &split.split_metadata.split_id) | ||
| { |
There was a problem hiding this comment.
Apply exclusions before immature split early returns
For compaction planner scans against the file-backed metastore, queries combine retain_immature with excluded_split_ids, but split_query_predicate returns from the maturity check above before reaching this new exclusion block. Once the first SCAN_PAGE_SIZE immature splits are already tracked, list_splits keeps returning that same excluded page, ingest_splits skips it, and the planner never advances to later splits. Move this exclusion ahead of the maturity return path or avoid returning early there.
Useful? React with 👍 / 👎.
| let Some(merge_scheduler_service) = self.merge_scheduler_service_opt.clone() else { | ||
| return Ok(None); |
There was a problem hiding this comment.
Keep Parquet merge pipelines until compactors support them
When enable_standalone_compactors disables the local merge scheduler, this also returns None for metrics/Parquet indexes, so their publishers no longer feed a ParquetMergePlanner. The new standalone compactor only plans MergeOperations from the regular splits table and does not process metrics_splits/ParquetMergeTasks, so metrics indexes in standalone mode will accumulate unmerged Parquet splits indefinitely. Keep the Parquet merge pipeline active or add equivalent standalone Parquet compaction before dropping this mailbox.
Useful? React with 👍 / 👎.
| // `MergePolicy::operations` emits at most one op per level per call, which under a backlog | ||
| // leaves the bulk of `splits` untouched per tick. Loop until no new operations are created. | ||
| loop { | ||
| let operations = merge_policy.operations(splits); |
There was a problem hiding this comment.
Run finalize merge policy in standalone compaction
With standalone compactors enabled, indexers no longer spawn local merge pipelines, so source shutdown no longer sends RunFinalizeMergePolicyAndQuit; this planner path only calls operations(). For finite or cold sources that rely on max_finalize_merge_operations, the final merge policy is never applied and straggler splits below the normal merge factor remain unmerged. Add a finalize path in the standalone planner or keep local finalization for those sources.
Useful? React with 👍 / 👎.
| ingester_opt: Option<Ingester>, | ||
|
|
||
| pub compaction_service_client_opt: Option<CompactionPlannerServiceClient>, | ||
| pub _compactor_supervisor_opt: Option<Mailbox<CompactorSupervisor>>, |
There was a problem hiding this comment.
Include compactor supervisor in liveness checks
This stores the compactor supervisor separately, but the health routes still only receive the indexer and janitor mailboxes, and CompactorSupervisor has no Healthz wiring. On compactor-only nodes, if the supervisor actor fails, /health/livez continues to return healthy because no enabled service is checked, leaving orchestration unaware that compaction has stopped. Wire the supervisor into the liveness path like the indexer/janitor services.
Useful? React with 👍 / 👎.
| let planner = CompactionPlanner::new(metastore_client.clone()); | ||
| let (mailbox, handle) = universe.spawn_builder().spawn(planner); |
There was a problem hiding this comment.
Elect a single compaction planner per cluster
In clusters with more than one janitor, this starts an independent in-memory CompactionPlanner on every janitor, while compactor-only nodes use a balanced Janitor channel for report_status. A task can be assigned by one planner and reported to another, and each planner scans the same Published immature splits without shared state, so duplicate compactions and spurious timeouts/reassignments are likely in the default multi-janitor topology. Use a single elected planner or persist/shard assignment state before enabling multiple planners.
Useful? React with 👍 / 👎.
| let mut doc_mappers = HashMap::new(); | ||
| doc_mappers.insert(*doc_mapping_uid, doc_mapper); |
There was a problem hiding this comment.
Do not alias current doc mapper to old split UIDs
When an index has old immature splits from a previous doc mapping, get_for_split passes that old doc_mapping_uid here, but this code fetches only the current index metadata and stores the current doc mapper under the old UID. The resulting assignment serializes self.config.doc_mapping, so standalone compactors can merge old-schema splits with the new schema after a mapping update. Verify that the fetched config's UID matches the split UID, or retain/fetch the historical doc mapping for that UID.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3220d844d2
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let operations = merge_policy.operations(splits); | ||
| if operations.is_empty() { | ||
| break; |
There was a problem hiding this comment.
Drop tracked splits once they become mature
For sparse sources or partitions that never reach the merge factor, a split can age past its maturation period while sitting in needs_compaction. The merge policies skip mature splits by leaving them in splits, so operations() returns empty here and the later is_empty() cleanup never removes their IDs from needs_compaction_split_ids; those stale IDs are then excluded on every metastore scan and can grow unbounded on long-running planners. Re-evaluate and remove now-mature splits from the tracked set before breaking out.
Useful? React with 👍 / 👎.
| spawn_ctx: &SpawnContext, | ||
| ) -> anyhow::Result<()> { | ||
| let source_uid_label = source_uid_metrics_label( | ||
| assignment.index_uid.as_ref().unwrap(), |
There was a problem hiding this comment.
Validate assignment index UID before using it
When a compactor receives a MergeTaskAssignment without index_uid (the proto field is optional on the wire, and build_compaction_pipeline already treats this as a recoverable error), this unwrap runs before that validation and panics the supervisor while building metric labels. A malformed planner response should be logged as a failed spawn or rejected through the existing ok_or_else path rather than taking the compactor actor down.
Useful? React with 👍 / 👎.
| match self.indexes.get(index_uid) { | ||
| Some(entry) if entry.doc_mappers.contains_key(doc_mapping_uid) => {} | ||
| _ => self.fetch_index_config(index_uid, doc_mapping_uid).await?, |
There was a problem hiding this comment.
Refresh cached index settings after metadata updates
If an operator updates an index's indexing_settings or retention_policy without changing the doc mapping UID, this cache hit keeps returning the old IndexEntry forever. The standalone planner then continues planning with the stale merge policy and sends compactors the stale retention policy until the janitor restarts, even though update_index supports changing these fields at runtime; cache on an index metadata version or periodically refetch instead of keying freshness only on doc_mapping_uid.
Useful? React with 👍 / 👎.
| /// Maximum number of concurrent split uploads across all pipelines. | ||
| #[serde(default = "CompactorConfig::default_max_concurrent_split_uploads")] | ||
| pub max_concurrent_split_uploads: usize, |
There was a problem hiding this comment.
Reject zero compactor upload concurrency
When compactor.max_concurrent_split_uploads is set to 0, deserialization accepts it and the compactor later constructs merge uploaders with a zero-permit semaphore, so the first packaged split waits forever for an upload permit and the task remains in progress with its slot stuck. Make this NonZeroUsize or add validation so a bad compactor config fails at startup instead of deadlocking compaction.
Useful? React with 👍 / 👎.
| let max_concurrent_split_uploads_index = (self.max_concurrent_split_uploads / 2).max(1); | ||
| let max_concurrent_split_uploads_merge = | ||
| (self.max_concurrent_split_uploads - max_concurrent_split_uploads_index).max(1); | ||
| self.max_concurrent_split_uploads - max_concurrent_split_uploads_index; |
There was a problem hiding this comment.
that feels like something which can now go to zero if max_concurrent_split_uploads=1 on the "there is a merge_scheduler" path, and cause issues down the line
There was a problem hiding this comment.
Great catch, you're right, this shouldn't have changed
There was a problem hiding this comment.
(i'm not sure if that shouldn't have changed, the "there is no merge_scheduler path" might want its zero)
| -- The partial predicate is restricted to split_state = 'Published' because | ||
| -- partial-index predicates must be IMMUTABLE; "now()" cannot appear here. |
There was a problem hiding this comment.
i'm not entirely convinced by the use of a partial index here. Moving from Staged->Published and Published->MarkedForDeletion both need to write to this index, and Published should be the majority of splits at any given point in time, i don't think having a partial index here gains us much
| /// Atomicity is per-migration, not per-run: a failure on migration N leaves | ||
| /// migrations 1..N-1 applied and committed in `_sqlx_migrations`. The | ||
| /// operator fixes the failing migration and re-runs. |
There was a problem hiding this comment.
iirc this used to not be the property we settled upon, but i think this is a better level of atomicity and rollback capabilities. If a migrations worked there is no reason to rollback because the next one failed
| // One bind regardless of list length: avoids postgres' 65535 param | ||
| // ceiling and keeps parse-time O(1) in the exclude size. The `$1` is |
There was a problem hiding this comment.
❤️
there are other places we should do that i think, i recall seeing queries with dozens (hundreds?) of parameter, for split deletion by id or something like that
(not something to do in this PR, but we should track that somewhere)
Description
Merges the split compaction feature branch into main.
Split compaction is gated by an env var and a config flag on the indexer config. It's non-intrusive: when it's off, everything remains as is- indexer nodes spawn merge pipelines and merge their own splits.
Standalone compactors (mergers) is experimental for now, and opt-in. It's configurable and there will be additional changes to it as usage of it increases.
How was this PR tested?
Testing on a cluster receiving a moderate amount of traffic. Unit tests and integration tests as needed.